浅入浅出 NIO

引子

本文从最基本的 IO 出发,引出零拷贝技术,继而发散到用到零拷贝技术的 NIO,谈及 NIO,就必须谈一谈著名的网络通信框架 Netty 了,而 Netty 又是 metaQ、HSF等熟知技术栈的基础,本文旨在以 NIO 为核心,辐射到其相关的知识点。

Zero Copy

这一部分基本上参考:深入剖析Linux IO原理和几种零拷贝机制的实现

预读知识

物理内存和虚拟内存

物理内存

物理内存指通过物理内存条而获得的内存空间。

虚拟内存

虚拟内存是计算机系统内存管理的一种技术。 它使得应用程序认为它拥有连续的可用的内存(一个连续完整的地址空间)。而实际上,虚拟内存通常是被分隔成多个物理内存碎片,还有部分暂时存储在外部磁盘存储器上,在需要时进行数据交换,加载到物理内存中来。 目前,大多数操作系统都使用了虚拟内存,如 Windows 系统的虚拟内存、Linux 系统的交换空间等等。

离开进程谈虚拟内存没有任何意义,不同进程里的同一个虚拟地址指向的物理地址是不一样的。每个用户进程维护了一个单独的页表(Page Table),虚拟内存和物理内存就是通过这个页表实现地址空间的映射的,页表(Page Table)里面的数据由操作系统维护。

引入虚拟内存的好处

在进程和物理内存之间,加了一层虚拟内存的概念,好处有:

  1. 提供更大的地址空间,因为虚拟内存还可以放在磁盘上或者寄存器中,而物理内存并不行,而且虚拟地址空间是连续的,我们不需要操心具体是如何存放的,操作系统会帮我们映射好;
  2. 安全性更好,虚拟内存设有读写属性,并且不同进程互不影响;
  3. 可以懒加载,只有在需要读相应的文件的时候,才将它真正的从磁盘上加载到内存中来,而在内存吃紧的时候又可以将这部分内存清空掉,提高物理内存利用效率,并且所有这些对应用程序是都透明的;
  4. 可以共享内存,动态库只需要在内存中存一份就够了,然后将它映射到不同进程的虚拟地址空间中,让进程觉得自己独占了这个文件。进程间的内存共享也可以通过映射同一块物理内存到进程的不同虚拟地址空间来实现共享。

Tip:我们后文讲的,都是虚拟内存哦。

内核空间和用户空间

为了避免用户直接操作内核「可以操作一切,牛逼得很」,保证内核安全,所以将虚拟内存划分为用户空间和内核空间,进程在访问到这两个空间的时候需要进行状态的转变(内核态、用户态)。

像我们在 jvm 中谈到的,堆、栈、方法区等等都是默认是用户空间,因为我们可以直接访问的到。

内核态 & 用户态

内核态可以执行任意命令,调用系统的一切资源,而用户态只能执行简单的运算,不能直接调用系统资源。用户态必须通过系统接口(System Call),才能向内核发出指令。比如,当用户进程启动一个 bash 时,它会通过 getpid() 对内核的 pid 服务发起系统调用,获取当前用户进程的 ID;当用户进程通过 cat 命令查看主机配置时,它会对内核的文件子系统发起系统调用。

1

I/O 读写方式

CPU 轮询

CPU 对 I/O 端口进行不断地检测,直到数据准备好了,这个就很吃 cpu,pass!

I/O 中断

在 DMA 技术出现之前,应用程序与磁盘之间的 I/O 操作都是通过 CPU 的中断完成的。每次用户进程读取磁盘数据时,都需要 CPU 中断,然后发起 I/O 请求等待数据读取和拷贝完成,等数据准备完毕之后,发起 I/O 中断信号,提醒 CPU 数据已经准备好了。

2

  1. 用户进程向 CPU 发起 read 系统调用读取数据,由用户态切换为内核态,然后一直阻塞等待数据的返回。
  2. CPU 在接收到指令以后对磁盘发起 I/O 请求,将磁盘数据先放入磁盘控制器缓冲区。
  3. 数据准备完成以后,磁盘向 CPU 发起 I/O 中断。
  4. CPU 收到 I/O 中断以后将磁盘缓冲区中的数据拷贝到内核缓冲区,然后再从内核缓冲区拷贝到用户缓冲区。
  5. 用户进程由内核态切换回用户态,解除阻塞状态,然后等待 CPU 的下一个执行时间钟。

DMA传输原理

DMA(Direct Memory Access),又称直接内存存取,也就是通过硬件直接访问主内存,不需要通过 cpu,有点像协处理器,目前大多数的硬件都支持 DMA,例如网卡、显卡、声卡、磁盘等等。

有了 DMA,CPU就不用去处理将数据从磁盘缓冲区读到内核缓冲区「这个过程是很耗时的」了,直接交由 DMA 操作就行了。

3

建立在 DMA 传输上的读写的一般步骤

在 Linux 系统中,传统的访问方式是通过 write() 和 read() 两个系统调用实现的,通过 read() 函数读取文件到到缓存区中,然后通过 write() 方法把缓存中的数据输出到网络端口。

4

具体步骤如下:

  1. 用户空间调用 read() 方法,会触发系统调用,进程由用户态转为内核态;
  2. 然后 DMA 会将磁盘缓冲区的数据加载到内核缓冲区中;
  3. CPU 会将内核缓冲区的数据 COPY 到用户缓冲区中,此时进程由内核态转为用户态;
  4. 然后用户空间调用 write() 方法,会触发系统调用,CPU 会将用户缓冲区的数据 COPY 到 Socket 缓冲区中「在内核空间中」,此时进程由用户态转成内核态;
  5. 然后 Socket缓冲区中的数据会通过 DMA 的方式 COPY到网卡处,传送完成;
  6. 此时 CPU 会再次由内核态切回到用户态,方便下一次的工作。

由此可知,一共有 4 次拷贝过程,其中 2 次 CPU 参与工作, 2 次DMA参与工作,最要命的是有 4 次的上下文切换。

自此,我们的预读知识告一段落,这里我们已经知道了普通的读写过程,切换上下文的次数和拷贝数据的次数很多,所以我们急需改进,由此,零拷贝技术运应而生。

零拷贝技术

零拷贝技术是指在计算机执行操作时,减少 CPU 将数据从一个内存区域复制到另一个内存区域的次数。

零拷贝带来的显而易见的好处就是可以减少上下文切换「用户空间和内核空间之间切换导致的内核态和用户态之间的切换」以及 CPU 的拷贝次数。

实现零拷贝用到的最主要技术是 DMA 数据传输技术内存区域映射技术。由此,我们介绍四种零拷贝的实现思路:

  1. 用户态直接 I/O:应用程序可以直接访问硬件存储,操作系统内核只是辅助数据传输。这种方式依旧存在用户空间和内核空间的上下文切换,硬件上的数据直接拷贝至了用户空间,不经过内核空间。因此,直接 I/O 不存在内核空间缓冲区和用户空间缓冲区之间的数据拷贝。

  2. mmap + write:直接将内核空间中的内核缓冲区和用户空间中的用户缓冲区做一个地址映射,这样就省去了在 read 过程中将内核缓冲区的数据 COPY 到用户缓冲区中。很适合文件的修改读写

  3. sendfile:不仅将 CPU 参与的拷贝次数减少了,而且直接减少了上下文切换的次数,因为在用户缓冲区根本就不会有数据,当然坏处也很明显,就是无法修改数据,只能进行传输,所以很适合大文件的传送

    当然了,还会讲到 sendfile 的升级版。

  4. splice:就是在内核缓冲区和Socket缓冲区之间建立管道。

用户态直接I/O

进程直接在用户态访问硬件设备「这个我也不知道是怎么做到的…」,这类进程称为自缓存应用程序

直接 I/O 访问文件方式可以减少 CPU 的使用率以及内存带宽的占用,但是直接 I/O 有时候也会对性能产生负面影响。所以在使用直接 I/O 之前一定要对应用程序有一个很清醒的认识,只有在确定了设置缓冲 I/O 的开销非常巨大的情况下,才考虑使用直接 I/O。直接 I/O 经常需要跟异步 I/O 结合起来使用,因为CPU 和磁盘 I/O 之间的执行时间差距,会造成大量的浪费。

5

mmap + write

mmap,称为内存地址映射,可以将内核缓冲区的地址映射到用户缓冲区。

6

如图所示,基本和传统的读写差不多,只是减少了一次 CPU 的 COPY 工作,但是上下文切换的次数没有变化,依旧是 4 次。mmap 主要的用处是提高 I/O 性能,特别是针对大文件。对于小文件,内存映射文件反而会导致碎片空间的浪费,因为内存映射总是要对齐页边界,最小单位是 4 KB,一个 5 KB 的文件将会映射占用 8 KB 内存,也就会浪费 3 KB 内存。

sendfile

数据可以直接在内核空间内部进行 I/O 传输,从而省去了数据在用户空间和内核空间之间的来回拷贝。与 mmap 内存映射方式不同的是, sendfile 调用中 I/O 数据对用户空间是完全不可见的。也就是说,这是一次完全意义上的数据传输过程。

7

相比较于 mmap 内存映射的方式,sendfile 少了 2 次上下文切换,但是仍然有 1 次 CPU 拷贝操作。sendfile 存在的问题是用户程序不能对数据进行修改,而只是单纯地完成了一次数据传输过程。

sendfile + DMA gather copy

Linux 2.4 版本的内核对 sendfile 系统调用进行修改,为 DMA 拷贝引入了 gather 操作。它将内核空间(kernel space)的读缓冲区(read buffer)中对应的数据描述信息(内存地址、地址偏移量)记录到相应的网络缓冲区( socket buffer)中,由 DMA 根据内存地址、地址偏移量将数据批量地从读缓冲区(read buffer)拷贝到网卡设备中,这样就省去了内核空间中仅剩的 1 次 CPU 拷贝操作。

在硬件的支持下,sendfile 拷贝方式不再从内核缓冲区的数据拷贝到 socket 缓冲区,取而代之的仅仅是缓冲区文件描述符和数据长度的拷贝,这样 DMA 引擎直接利用 gather 操作将页缓存中数据打包发送到网络中即可,本质就是和虚拟内存映射的思路类似。

8

sendfile + DMA gather copy 拷贝方式同样存在用户程序不能对数据进行修改的问题,而且本身需要硬件的支持,它只适用于将数据从文件拷贝到 socket 套接字上的传输过程

splice

splice 系统调用可以在内核空间的读缓冲区(read buffer)和网络缓冲区(socket buffer)之间建立管道(pipeline),从而避免了两者之间的 CPU 拷贝操作。splice 拷贝方式也同样存在用户程序不能对数据进行修改的问题。除此之外,它使用了 Linux 的管道缓冲机制,可以用于任意两个文件描述符中传输数据,但是它的两个文件描述符参数中有一个必须是管道设备。

一句话归纳,就是直接在内核缓冲区和Socket缓存区之间建立管道。

9

总结

拷贝方式 CPU拷贝次数 DMA拷贝次数 系统调用 上下文切换次数
传统方式(read + write) 2 2 read + write 4
内存映射(mmap + write) 1 2 mmap + write 4
sendfile 1 2 sendfile 2
sendfile + DMA gather copy 0 2 sendfile 2
splice 0 2 splice 2

NIO

编程新说,就是这篇文章把我拉入了 NIO 的深坑

一段一段式,个人觉得可读性很强

Java3y 跟我想的一样,文件I/O 主要使用零拷贝技术,而网络 I/O 主要使用 I/O 复用技术

简单的总结,言简意赅,可惜没有代码demo

这里,我分三部分讲:

  1. 先介绍一下 NIO 的基本概念,主要是三剑客「Selector、Buffer、Channel」;
  2. 然后讲一下 NIO 中用到的零拷贝技术;
  3. 最后再谈一下 NIO 中最为重要的 SocketChannel 用到的 I/O 复用技术。

我建议学习 NIO 的路线是:

  1. 先系统了解各个概念,有什么:Java NIO 系统教程,总共有 17 讲
  2. 然后看 Java3y 的总结,个人觉得总结的很到位,尤其是将 NIO 分解成两大块技术「零拷贝技术 & I/O 复用技术」,虽然他没有细致的谈到文件 I/O 使用的技术「这是一大缺陷」,但是有在重点强调 Selector 带来的同步非阻塞的 I/O 读写方式,这点是很好的,链接:如何学习Java的NIO? - Java3y的回答 - 知乎
  3. 可以再好好补充下这两项技术的具体应用,首先是文件 I/O 中的零拷贝技术,主要就是通过 FileChannel 中的 map() 使用了 mmap,transferTo() & transferFrom() 使用了 sendfile。「为了更好地理解流程,可以简单的将 Buffer 看作是用户缓冲区,Channel 看作是内核缓冲区」,尤其是 mmap 的在这里面的使用,可以参见这个博客里面的 demo,浅显易懂,下面我也会具体讲的:mmap在 NIO 中的具体使用
  4. 其次就是网络 I/O 中使用到的 I/O 复用技术了,这个在很多地方都有提及到,甚至大家认为 NIO 是 (Non-Blocking I/O)的意思,可见大家对 NIO 中这项技术的重视程度「虽然我认为 NIO 是 New I/O 的意思,因为零拷贝的实现也很牛逼啊!」,这里呢,还是建议大家看学习路线的第一个链接,里面其实也有许多小 demo的,当然了,如果想看具体的套接字传输的 demo,我推荐:编程新说,就是这篇文章带我入的坑,里面还提及了 AIO 哦

基本概念

这玩意是什么

Java NIO是 Java 1.4之后新出的一套IO接口,这里的的新是相对于原有标准的Java IO和Java Networking接口。NIO提供了一种完全不同的操作方式。

Non-Blocking I/O or New I/O

New 在哪里,特点是啥

  • 标准的IO编程接口是面向字节流和字符流的。而NIO是面向通道和缓冲区的,数据总是从通道中读到buffer缓冲区内,或者从buffer写入到通道中。「知识是相通的,这块跟零拷贝技术中的用户缓冲区和内核缓冲区的形式一模一样,用户缓冲区就是 Buffer,我们可以在这里进行操作数据,而内核缓冲区数据就是 Channel,我们在这里不能去操作数据,但是可以传输数据。」这个特点在 文件I/O 和 网络 I/O 中都有很大的用处,在文件 I/O 中我已经讲过了好处了,就是可以和零拷贝那一套结合上,在网络 I/O 中,这个特点可以很好的控制数据,毕竟缓冲区相对字节字符流来说要强得多。
  • Non-Blocking I/O。我们可以进行非阻塞IO操作。比如说,单线程中从通道读取数据到buffer,同时可以继续做别的事情,当数据读取到buffer中后,线程再继续处理数据。写数据也是一样的。就是用 Selector 实现的。

三剑客「Channel、Buffer、Selector」

channel

用来运输数据的,是全双工的,双向都可以,如果觉得不好理解的话,就看成内核缓冲区就行了,所以我们在程序中想要用到数据的话必须搭配 buffer,channel 从 buffer 中读数据,相当于内核缓冲区中的数据传送到用户缓冲区,buffer 写数据至 channel,相当于用户缓冲区中的数据传送到 channel。

主要的 channel 有:

  • FileChannel 「文件 I/O 用到的」
  • DatagramChannel 「网络 I/O 用到,udp传输」
  • SocketChannel 「网络 I/O 用到,tcp传输」
  • ServerSocketChannel 「网络 I/O 用到,用于建立 tcp连接的channel」

buffer

buffer本质上就是一块内存区,可以用来写入数据,并在稍后读取出来。这块内存被NIO Buffer包裹起来,对外提供一系列的读写方便开发的接口。就是用来跟 channel 交互的,我们对数据的操作都是在这里完成的,buffer应该是三剑客中最值得讲的。

分类

Java NIO有如下具体的Buffer类型:

  • ByteBuffer
  • MappedByteBuffer
  • CharBuffer
  • DoubleBuffer
  • FloatBuffer
  • IntBuffer
  • LongBuffer
  • ShortBuffer

正如你看到的,Buffer的类型代表了不同数据类型,换句话说,Buffer中的数据可以是上述的基本类型;

其中最为重要的就是 ByteBuffer,在网络 I/O 中我们基本上就是使用 ByteBuffer 了,在文件 I/O 中由于可以使用零拷贝技术,所以主要用的是用到mmap的 MappedByteBuffer,聪明的你肯定已经知道怎么用这个MappedByteBuffer了,也就是直接让 MappedByteBuffer 与 FileChannel 形成内存地址映射「类比用户缓冲区和内核缓冲区做地址映射」,减少拷贝次数。

数据结构

我们来好好了解一下 buffer,先来看主要的属性

1
2
3
4
5
// Invariants: mark <= position <= limit <= capacity
private int mark = -1; // 默认 -1,标记,可以配合 mark()、reset()方法使用
private int position = 0;
private int limit;
private int capacity;
  • 容量(Capacity)

作为一块内存,buffer有一个固定的大小,叫做capacity容量。一旦buffer写满了就需要清空已读数据以便下次继续写入新的数据。

  • 位置(Position)

当写入数据到Buffer的时候需要中一个确定的位置开始,默认初始化时这个位置position为0,一旦写入了数据比如一个字节,整型数据,那么position的值就会指向数据之后的一个单元,position最大可以到capacity-1.

当从Buffer读取数据时,也需要从一个确定的位置开始。buffer从写入模式变为读取模式时,position会归零,每次读取后,position向后移动。

  • 上限(Limit)

在写模式,limit的含义是我们所能写入的最大数据量。它等同于buffer的容量。

一旦切换到读模式,limit则代表我们所能读取的最大数据量,他的值等同于写模式下position的位置。

数据读取的上限时buffer中已有的数据,也就是limit的位置(原position所指的位置)。

读模式和写模式

在读模式和写模式下,position、mark、limit的值都会变化。如果不显示调用 mark() 函数,mark值不会变,如果显式调用,那么 mark 会等于 当前的 position,如果调用 flip(),由写模式切换成读模式,此时不仅position和limit会发生变化,mark 也会变为 -1。当然了,clear 也会将 mark 置为初始状态 -1。

buffers-modes.png

1
2
3
4
5
6
7
8
9
10
11
12
public final Buffer flip() {
limit = position;
position = 0;
mark = -1;
return this;
}
public final Buffer clear() {
position = 0;
limit = capacity;
mark = -1;
return this;
}
基本用法

利用Buffer读写数据,通常遵循四个步骤:

  • 把数据写入buffer;
  • 调用flip;
  • 从Buffer中读取数据;
  • 调用buffer.clear()或者buffer.compact()

当写入数据到buffer中时,buffer会记录已经写入的数据大小。当需要读数据时,通过flip()方法把buffer从写模式调整为读模式;在读模式下,可以读取所有已经写入的数据。

当读取完数据后,需要清空buffer,以满足后续写入操作。清空buffer有两种方式:调用clear()或compact()方法。clear会清空整个buffer,compact则只清空已读取的数据,未被读取的数据会被移动到buffer的开始位置,写入位置则近跟着未读数据之后。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class Buffer_Demo {
public static void main(String[] args) throws IOException {
RandomAccessFile aFile = new RandomAccessFile("file.txt", "rw");
FileChannel inChannel = aFile.getChannel();

//create buffer with capacity of 48 bytes
ByteBuffer buf = ByteBuffer.allocate(48);
// 调用 channel 的 read or write,就相当于在内核态系统调用一样
// 至于是 read 还是 write 取决于 new 的时候文件是 源文件还是目的文件
// 如果是源文件,那就是 read 到 随便一个地方
// 如果是目的文件,那就是随便将东西 write 到 目的文件
int bytesRead = inChannel.read(buf); //read into buffer.
int count = 0 ;
while (bytesRead != -1) {

buf.flip(); // 切换到读模式
count++;
System.out.println(count);
while(buf.hasRemaining()){
System.out.print((char) buf.get()); // read 1 byte at a time
}

buf.clear(); //make buffer ready for writing

// 这一行一定要在一定ByteBuffer.clear()之后,因为如果 position == limit,就代表ByteBuffer不能存入东西了,
// 因此连流终止信息都不能接受,不返回-1,而是返回 0
// 而clear会将 position 置为 0,不会有 position == limit,并且 buf 已经读完了,故会返回 -1.
// 再三提醒,调用 read() 方法,会判断是否读完了,如果读完了就会返回 -1,所以文件读取只会读一遍,但是buffer可以读多遍
bytesRead = inChannel.read(buf);
}
aFile.close();
}
}
其他一些函数的用法
  • mark() & reset()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
/**
* 用于对mark属性的测试
* 调用buffer.remak()时,会将当前的position值赋给remak属性,
* 保存当前操作的状态,然后get继续执行,当调用buffer.reset()时,会将之前当前的remark值赋予position,
* 实现状态的恢复.
* flip clear ( reset mark ) rewind
* @author lenovo
*
*/
class BufferMark {

public static void main(String[] args) {
// TODO Auto-generated method stub
IntBuffer intBuffer = IntBuffer.allocate(10);
for(int i = 0;i<intBuffer.capacity();i++) {
int b = new SecureRandom().nextInt(20);
//此方法为相对方法(relative),他会导致position的变化
intBuffer.put(b);
//此方法是绝对方法(absolute),他的使用只会讲对应位置的值替换到,并不会更改position
//intBuffer.put(i,new SecureRandom().nextInt(20));

System.out.print(b+" ");
}
//翻转buffer
intBuffer.flip();
System.out.println("");
System.out.println("读取到的数据:"+intBuffer.get());
System.out.println("读取到的数据:"+intBuffer.get());
System.out.println("1:buffer position="+intBuffer.position()+" limit="+intBuffer.limit());
//标记buffer状态
intBuffer.mark();
System.out.println("2: buffer position="+intBuffer.position()+" limit="+intBuffer.limit());
System.out.println("读取到的数据:"+intBuffer.get());
System.out.println("读取到的数据:"+intBuffer.get());
System.out.println("3: buffer position="+intBuffer.position()+" limit="+intBuffer.limit());
//恢复buffer状态
intBuffer.reset();
System.out.println("读取到的数据:"+intBuffer.get());
System.out.println("读取到的数据:"+intBuffer.get());
System.out.println("4: buffer position="+intBuffer.position()+" limit="+intBuffer.limit());
/**
* 4 10 3 11 17 5 18 10 2 6
读取到的数据:4
读取到的数据:10
1: buffer position=2 limit=10
2: buffer position=2 limit=10
读取到的数据:3
读取到的数据:11
3: buffer position=4 limit=10
读取到的数据:3
读取到的数据:11
4: buffer position=4 limit=10
*/
}
}
  • buffer.get()

读取缓冲区的数据,默认按 字节 读取;

  • rewind()

Buffer.rewind()方法将position置为0,这样我们可以重复读取buffer中的数据。limit保持不变。

  • clear() & compact()

一旦我们从buffer中读取完数据,需要复用buffer为下次写数据做准备。只需要调用clear或compact方法。

clear方法会重置position为0,limit为capacity,也就是整个Buffer清空。实际上Buffer中数据并没有清空,我们只是把标记为修改了。

如果Buffer还有一些数据没有读取完,调用clear就会导致这部分数据被“遗忘”,因为我们没有标记这部分数据未读。

针对这种情况,如果需要保留未读数据,那么可以使用compact。 因此compact和clear的区别就在于对未读数据的处理,是保留这部分数据还是一起清空。

selector

Selector是Java NIO中的一个组件,用于检查一个或多个NIO Channel的状态是否处于可读、可写。如此可以实现单线程管理多个channels,也就是可以管理多个网络链接。这也是 I/O 复用技术的核心,也是整个 NIO 最核心的组件。在 I/O 复用技术再展开吧。

overview-selectors.png

用到的零拷贝技术

NIO中通过FileChannel来提供Zero-Copy的支持,分别是

  • FileChannel.map: 将文件的一部分映射到内存
  • FileChannel.transferTo: 将本Channel的文件字节转移到指定的可写Channel

mmap

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
/**
* 测试FileChannel的用法
*
* @author sound2gd
*
*/
public class FileChannnelTest {

public static void main(String[] args) {
File file = new File("src/com/cris/chapter15/f6/FileChannnelTest.java");
try (
// FileInputStream打开的FileChannel只能读取
FileChannel fc = new FileInputStream(file).getChannel();
// FileOutputStream打开的FileChannel只能写入
FileChannel fo = new FileOutputStream("src/com/cris/chapter15/f6/a.txt").getChannel();) {

// 将FileChannel的数据全部映射成ByteBuffer
MappedByteBuffer mbb = fc.map(MapMode.READ_ONLY, 0, file.length());
// 使用UTF-8的字符集来创建解码器
Charset charset = Charset.forName("UTF-8");
// 直接将buffer里的数据全部输出
fo.write(mbb);
mbb.clear();
// 创建解码器
CharsetDecoder decoder = charset.newDecoder();
// 使用解码器将byteBuffer转换为CharBuffer
CharBuffer decode = decoder.decode(mbb);
System.out.println(decode);
} catch (Exception e) {

}
}

}

更多详细源码解析见:https://zhuanlan.zhihu.com/p/83398714

sendfile

transferTo() 和 transferFrom() 方法的底层实现原理,这两个方法也是 java.nio.channels.FileChannel 的抽象方法,由子类 sun.nio.ch.FileChannelImpl.java 实现。transferTo() 和 transferFrom() 底层都是基于 sendfile 实现数据传输的。这里就是 Channel 之间的传输。

I/O 复用技术

其实说白了,就是一个 Selector 对应多个 Channel,可以专门用一个管理多个,就是 I/O 复用。

下面直接放一个 Socket I/O 实例。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
package NIO.编程新说;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Iterator;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;

public class NIO {

}
/**
* @author yang
* @since 2020-03-21
*/
class Client2 {

public static void main(String[] args) {
try {
for (int i = 0; i < 20; i++) {
Socket s = new Socket();
s.connect(new InetSocketAddress("127.0.0.1", 27771));
processWithNewThread(s, i);
}
} catch (IOException e) {
e.printStackTrace();
}
}

static void processWithNewThread(Socket s, int i) {
Runnable run = () -> {
try {
//睡眠随机的5-10秒,模拟数据尚未就绪
Thread.sleep((new Random().nextInt(6) + 5) * 1000);
//写1M数据,为了拉长服务器端读数据的过程
s.getOutputStream().write(prepareBytes());
//睡眠1秒,让服务器端把数据读完
Thread.sleep(1000);
s.close();
} catch (Exception e) {
e.printStackTrace();
}
};
new Thread(run).start();
}

static byte[] prepareBytes() {
byte[] bytes = new byte[1024*1024*1];
for (int i = 0; i < bytes.length; i++) {
bytes[i] = 1;
}
return bytes;
}
}

/**
* @author yang
* @since 2020-03-21
*/
class NioServer2 {

static int clientCount = 0;
static AtomicInteger counter = new AtomicInteger(0);
static SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");

public static void main(String[] args) {
try {
Selector selector = Selector.open();
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
ssc.register(selector, SelectionKey.OP_ACCEPT);
ssc.bind(new InetSocketAddress("127.0.0.1", 27771));
while (true) {
selector.select();
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> iterator = keys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
if (key.isAcceptable()) {
ServerSocketChannel ssc1 = (ServerSocketChannel)key.channel();
SocketChannel sc = null;
while ((sc = ssc1.accept()) != null) {
sc.configureBlocking(false);
sc.register(selector, SelectionKey.OP_READ);
InetSocketAddress rsa = (InetSocketAddress)sc.socket().getRemoteSocketAddress();
System.out.println(time() + "->" + rsa.getHostName() + ":" + rsa.getPort() + "->" + Thread.currentThread().getId() + ":" + (++clientCount));
}
} else if (key.isReadable()) {
//先将“读”从感兴趣操作移出,待把数据从通道中读完后,再把“读”添加到感兴趣操作中
//否则,该通道会一直被选出来
key.interestOps(key.interestOps() & (~ SelectionKey.OP_READ));
processWithNewThread((SocketChannel)key.channel(), key);
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
}

static void processWithNewThread(SocketChannel sc, SelectionKey key) {
Runnable run = () -> {
counter.incrementAndGet();
try {
String result = readBytes(sc);
//把“读”加进去
key.interestOps(key.interestOps() | SelectionKey.OP_READ);
System.out.println(time() + "->" + result + "->" + Thread.currentThread().getId() + ":" + counter.get());
sc.close();
} catch (Exception e) {
e.printStackTrace();
}
counter.decrementAndGet();
};
new Thread(run).start();
}

static String readBytes(SocketChannel sc) throws Exception {
long start = 0;
int total = 0;
int count = 0;
ByteBuffer bb = ByteBuffer.allocate(1024);
//开始读数据的时间
long begin = System.currentTimeMillis();
while ((count = sc.read(bb)) > -1) {
if (start < 1) {
//第一次读到数据的时间
start = System.currentTimeMillis();
}
total += count;
bb.clear();
}
//读完数据的时间
long end = System.currentTimeMillis();
return "wait=" + (start - begin) + "ms,read=" + (end - start) + "ms,total=" + total + "bs";
}

static String time() {
return sdf.format(new Date());
}
}

具体的语句含义我这里就不再单独说明了,可以移步:https://wiki.jikexueyuan.com/project/java-nio-zh/java-nio-selector.html,这里有说明一些具体函数的用法,这里我们知道思想就好了。

对输入输出流的一些看法

在写上面的一些demo的时候,突然又对输入流输出流有点困惑,到底什么时候该输入流,什么时候该输出流,又为什么需要分输出流输入流呢?

  1. 为什么要分输入输出流?

因为目的不一样,输入流是表示从外部输入到内存中去,关注的重点在字节流从哪里来,而输出流是表示将内存中的数据存储到外部去,关注的重点是字节流到哪去,分开就是为了分工明确。二者操作的对象不一样,一个是源文件「输入流」,一个是目标文件「输出流」。

  1. 输入流和输出流何时用?

只要确定了要操作的文件是源文件还是目的文件,就能很清楚的知道输入输出流应该如何选择,像最简单的 FileOutputStream 和 FileInputStream ,在使用时分别会先指定目的文件和源文件,至于 fos.write()具体写啥我们并不关心,至于 fis.read(),读到哪里去我们也并不关心。

最后,由于这篇文章也不是主要写 I/O 流的,还是该收就收吧,最后放一个大招链接,没事的时候多看看,可以加深对 I/O 流的理解。

最详尽的 I/O 流大全

简述输入输出流

极具争议的一个话题

这就是 NIO 到底是同步阻塞,还是同步非阻塞,还是异步非阻塞呢?

我看过很多答案,也看了很多解释,真的云里雾里…

我先表明我的观点,我认为是同步非阻塞的。同步非同步,我觉得看的是消息来了,你是不是按序接收,比如我们的程序在一个线程内肯定是同步的,因为他是按序进行的,再比如在 node js 中所有的语句都不是按序来的,这个就算做异步,从这个角度看,NIO 是同步的,因为 Selector 要一个一个轮询,每个有请求的I/O 都是需要按序来接收的。再来谈谈非阻塞,我觉得谈阻塞非阻塞焦点在于这个线程的状态,如果他接收消息后,自己去忙着处理,其他线程必须等你处理完 I/O 操作然后一直等着,这个就肯定是阻塞了。在 NIO 中毫无疑问是非阻塞的,因为 Selector 在接收到 I/O 读写准备好了之后,会起一个线程去处理 I/O 读写,不会让这个 Selector 去处理,其他I/O 请求也就不用阻塞等待。

在知乎上,有人这么总结:

同步和异步说的是消息的通知机制「同步就是需要线程主动询问,异步就是压根不需要线程主动询问」,阻塞和非阻塞说的是线程的状态。

我认为勉强后半句 okay,前半句多少有些勉强「不过也能说得过去,下面我会解释」,你在 node js 中谈同步异步的时候有去考虑什么消息的通知机制吗?不过话说回来,主动询问的代价就是得一个一个询问,所以肯定同步,不需要去询问,每个 I/O 请求自己去进行处理,那这个也就不存在一个一个处理请求的问题了,必然是异步了。所以我觉得,同步还是异步,看的是 I/O 请求来了是不是按序去处理的,这里肯定是按序处理,因为都是由 Selector 去接收,所以是同步。阻塞还是非阻塞,就看这个 I/O 请求有没有导致线程阻塞了,实际上是没有,因为他找了 worker 线程去处理了,所以这里是非阻塞。

再补充两句吧,同步异步我认为始终是要以顺序为中心要义的,node js中我们谈同步异步,始终都是看执行语句的顺序,这里我觉得也是一样。在这里只不过是消息的通知机制的不同带来了顺序的变化,所以我觉得因果关系还是要确定好,同步异步的概念我认为不可能用消息通知机制来衡量,我们在讲同步代码块中有提到过什么通知机制的概念吗?我们始终都是在强调执行的顺序,以此来衡量同步异步。

综合来说,NIO 就是同步非阻塞的。

至于 AIO 「jdk1.7 引入」采用的是异步非阻塞,通过我上面的方式去理解就很好理解了。采用的是回调机制,I/O 请求来了,我们根本不需要去管,也就不存在请求按序处理的情况了,直接由操作系统自己处理,等到数据okay了,直接通知我就好了,全程都是异步非阻塞的状态。

当然了 AIO 这么好为啥我们还在用 NIO 呢?我觉得有几个原因吧,一个是这个 AIO 支持的太少了,还没完全发展起来,一个是不支持 udp,只支持 tcp。

NIO底层之奇妙的三兄弟

我擦,就一点半了…明天接着写,透露下,就是 select/poll/epoll。

回来了,回来了,接着昨天的彩蛋,我们今天来讲一讲 select/poll/epoll。

其实这个 select/poll/epoll 和 零拷贝没有关系,但是 跟 NIO 紧密相连,我们知道,NIO 中总共就是使用了两个技术:I/O 复用技术「网络 I/O」和零拷贝技术「文件 I/O」,我在上面其实也有讲,NIO 中最重要的技术就是 I/O 复用技术,那它是如何实现的呢?没错,底层就是 select/poll/epoll。

这三兄弟是啥

参考:

https://zhuanlan.zhihu.com/p/95872805

https://mp.weixin.qq.com/s?__biz=MzUyNzgyNzAwNg==&mid=2247483925&idx=1&sn=1ac3e863594745c7466b0e88a688b203&scene=21#wechat_redirect

select、poll、epoll都是I/O多路复用的机制。I/O多路复用就是通过一种机制,一个进程可以监视多个文件描述符,一旦某个描述符就绪(读就绪或写就绪),能够通知程序进行相应的读写操作 。

但是,select,poll,epoll本质还是同步I/O(I/O多路复用本身就是同步IO)的范畴,因为它们都需要在读写事件就绪后线程自己进行读写,读写的过程阻塞的。而异步I/O的实现是系统会把负责把数据从内核空间拷贝到用户空间,无需线程自己再进行阻塞的读写,内核已经准备完成。

Tip:linux中 socket 的 fd 是什么?

这个FD就是File Discriptor 中文翻译为文件描述符

Socket起源于unix,Unix中把所有的资源都看作是文件,包括设备,比如网卡、打印机等等,所以,针对Socket通信,我们在使用网卡,网卡又处理N多链接,每个链接都需要一个对应的描述,也就是惟一的ID,即对应的文件描述符。简单点说也就是 int fd = socket(AF_INET,SOCK_STREAM, 0); 函数socket()返回的就是这个描述符。在传输中我们都要使用这个惟一的ID来确定要往哪个链接上传输数据。

Select

API简介

linux系统中/usr/include/sys/select.h文件中对select方法的定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/* fd_set for select and pselect.  */
typedef struct
{
/* XPG4.2 requires this member name. Otherwise avoid the name
from the global namespace. */
#ifdef __USE_XOPEN
__fd_mask fds_bits[__FD_SETSIZE / __NFDBITS];
# define __FDS_BITS(set) ((set)->fds_bits)
#else
__fd_mask __fds_bits[__FD_SETSIZE / __NFDBITS];
# define __FDS_BITS(set) ((set)->__fds_bits)
#endif
} fd_set;

/* Check the first NFDS descriptors each in READFDS (if not NULL) for read
readiness, in WRITEFDS (if not NULL) for write readiness, and in EXCEPTFDS
(if not NULL) for exceptional conditions. If TIMEOUT is not NULL, time out
after waiting the interval specified therein. Returns the number of ready
descriptors, or -1 for errors.

This function is a cancellation point and therefore not marked with
__THROW. */
extern int select (int __nfds, fd_set *__restrict __readfds,
fd_set *__restrict __writefds,
fd_set *__restrict __exceptfds,
struct timeval *__restrict __timeout);

int __nfdsfd_set中最大的描述符+1,当调用select时,内核态会判断fd_set中描述符是否就绪,__nfds告诉内核最多判断到哪一个描述符。

readfds、writefds、__exceptfds都是结构体fd_set,fd_set可以看作是一个描述符的集合。 select函数中存在三个fd_set集合,分别代表三种事件,readfds表示读描述符集合,writefds表示读描述符集合,exceptfds表示异常描述符集合。当对应的fd_set = NULL时,表示不监听该类描述符。

timeval __timeout用来指定select的工作方式,即当文件描述符尚未就绪时,select是永远等下去,还是等待一定的时间,或者是直接返回

函数返回值int表示: 就绪描述符的数量,如果为-1表示产生错误 。

运行机制

Select会将全部fd_set『文件描述符』从用户空间拷贝到内核空间,并注册回调函数, 在内核态空间来判断每个请求是否准备好数据 。select在没有查询到有文件描述符就绪的情况下,将一直阻塞(select是一个阻塞函数)。如果有一个或者多个描述符就绪,select 会去轮询整个 fd_set「无差别轮询」。

Select的缺陷

  • 每次调用select,都需要把fd集合从用户态拷贝到内核态,fd越多开销则越大;
  • 每次调用select,都需要在内核遍历传递进来的所有fd,这个开销在fd很多时也很大
  • select支持的文件描述符数量有限,默认是1024。参见/usr/include/linux/posix_types.h中的定义:
1
# define __FD_SETSIZE 1024

poll

API简介

linux系统中/usr/include/sys/poll.h文件中对poll方法的定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/* Data structure describing a polling request.  */
struct pollfd
{
int fd; /* File descriptor to poll. */
short int events; /* Types of events poller cares about. */
short int revents; /* Types of events that actually occurred. */
};

/* Poll the file descriptors described by the NFDS structures starting at
FDS. If TIMEOUT is nonzero and not -1, allow TIMEOUT milliseconds for
an event to occur; if TIMEOUT is -1, block until an event occurs.
Returns the number of file descriptors with events, zero if timed out,
or -1 for errors.

This function is a cancellation point and therefore not marked with
__THROW. */
extern int poll (struct pollfd *__fds, nfds_t __nfds, int __timeout);

__fds参数时Poll机制中定义的结构体pollfd,用来指定一个需要监听的描述符。结构体中fd为需要监听的文件描述符,events为需要监听的事件类型,而revents为经过poll调用之后返回的事件类型,在调用poll的时候,一般会传入一个pollfd的结构体数组,数组的元素个数表示监控的描述符个数。

__nfds__timeout参数都和Select机制中的同名参数含义类似

运行机制

poll的实现和select非常相似,只是描述fd集合的方式不同,poll使用pollfd 结构代替select的fd_set(网上讲:类似于位图)结构,其他的本质上都差不多。所以Poll机制突破了Select机制中的文件描述符数量最大为1024的限制

Poll的缺陷

Poll机制相较于Select机制中,解决了文件描述符数量上限为1024的缺陷。但另外两点缺陷依然存在:

  • 每次调用poll,都需要把fd集合从用户态拷贝到内核态,fd越多开销则越大;
  • 每次调用poll,都需要在内核遍历传递进来的所有fd,这个开销在fd很多时也很大

epoll

Epoll在Linux2.6内核正式提出,是基于事件驱动的I/O方式。相对于select来说,epoll没有描述符个数限制;使用一个文件描述符管理多个描述符,将用户关心的文件描述符的事件存放到内核的一个事件表中,通过内存映射,使其在用户空间也可直接访问,省去了拷贝带来的资源消耗。

API简介

linux系统中/usr/include/sys/epoll.h文件中有如下方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/* Creates an epoll instance.  Returns an fd for the new instance.
The "size" parameter is a hint specifying the number of file
descriptors to be associated with the new instance. The fd
returned by epoll_create() should be closed with close(). */
extern int epoll_create (int __size) __THROW;

/* Manipulate an epoll instance "epfd". Returns 0 in case of success,
-1 in case of error ( the "errno" variable will contain the
specific error code ) The "op" parameter is one of the EPOLL_CTL_*
constants defined above. The "fd" parameter is the target of the
operation. The "event" parameter describes which events the caller
is interested in and any associated user data. */
extern int epoll_ctl (int __epfd, int __op, int __fd,
struct epoll_event *__event) __THROW;

/* Wait for events on an epoll instance "epfd". Returns the number of
triggered events returned in "events" buffer. Or -1 in case of
error with the "errno" variable set to the specific error code. The
"events" parameter is a buffer that will contain triggered
events. The "maxevents" is the maximum number of events to be
returned ( usually size of "events" ). The "timeout" parameter
specifies the maximum wait time in milliseconds (-1 == infinite).

This function is a cancellation point and therefore not marked with
__THROW. */
extern int epoll_wait (int __epfd, struct epoll_event *__events,
int __maxevents, int __timeout);

epoll_create函数:创建一个epoll实例并返回,该实例可以用于监控__size个文件描述符

epoll_ctl函数:向epoll中注册事件,该函数如果调用成功返回0,否则返回-1。用红黑树来管理这些事件,每新增一个事件,就新增一个红黑树节点。

  • __epfd为epoll_create返回的epoll实例
  • __op表示要进行的操作
  • __fd为要进行监控的文件描述符
  • __event要监控的事件

epoll_wait函数:类似与select机制中的select函数、poll机制中的poll函数,等待内核返回监听描述符的事件产生。该函数返回已经就绪的事件的数量,如果为-1表示出错。

  • __epfd为epoll_create返回的epoll实例
  • __events数组为 epoll_wait要返回的已经产生的事件集合
  • maxevents为希望返回的最大的事件数量(通常为events的大小)
  • __timeout和select、poll机制中的同名参数含义相同

运行机制

image-20200617104215978

epoll 将等待队列和就绪队列分开,红黑树存储等待队列中的进程,而一旦 I/O 就绪就将红黑树节点转移到链表,所以只要看到链表中的节点,就能知道哪些事件准备好了。

epoll操作过程需要上述三个函数,也正是通过三个函数完成Select机制中一个函数完成的事情,解决了Select机制的三大缺陷。epoll的工作机制更为复杂,我们就解释一下,它是如何解决Select机制的三大缺陷的。

  1. 对于第一个缺点,epoll的解决方案是:它的fd是共享在用户态和内核态之间的,所以可以不必进行从用户态到内核态的一个拷贝,大大节约系统资源。至于如何做到用户态和内核态,大家可以查一下“mmap”,它是一种内存映射的方法。
  2. 对于第二个缺点,epoll的解决方案不像select或poll一样每次都把当前线程轮流加入fd对应的设备等待队列中,而只在epoll_ctl时把当前线程挂一遍(这一遍必不可少),并为每个fd指定一个回调函数。当设备就绪,唤醒等待队列上的等待者时,就会调用这个回调函数,而这个回调函数会把就绪的fd加入一个就绪链表。那么当我们调用epoll_wait时,epoll_wait只需要检查链表中是否有存在就绪的fd即可,效率非常可观
  3. 对于第三个缺点,fd数量的限制,也只有Select存在,Poll和Epoll都不存在。由于Epoll机制中只关心就绪的fd,它相较于Poll需要关心所有fd,在连接较多的场景下,效率更高。在1GB内存的机器上大约是10万左右,一般来说这个数目和系统内存关系很大。

工作模式

相较于Select和Poll,Epoll内部还分为两种工作模式: LT水平触发(level trigger)ET边缘触发(edge trigger)

  • LT模式: 默认的工作模式,即当epoll_wait检测到某描述符事件就绪并通知应用程序时,应用程序可以不立即处理该事件;事件会被放回到就绪链表中,下次调用epoll_wait时,会再次通知此事件。
  • ET模式: 当epoll_wait检测到某描述符事件就绪并通知应用程序时,应用程序必须立即处理该事件。如果不处理,下次调用epoll_wait时,不会再次响应并通知此事件。

由于上述两种工作模式的区别,LT模式同时支持block和no-block socket两种,而ET模式下仅支持no-block socket。即epoll工作在ET模式的时候,必须使用非阻塞套接口,以避免由于一个fd的阻塞I/O操作把多个处理其他文件描述符的任务饿死。ET模式在很大程度上减少了epoll事件被重复触发的次数,因此效率要比LT模式高。

Epoll的优点

  • 文件描述符数量不再受限。
  • epoll 是线程安全的,而 select 和 poll 不是。
  • epoll 内部使用了 mmap 共享了用户和内核的部分空间,避免了数据的来回拷贝。
  • select、poll采用轮询的方式来检查文件描述符是否处于就绪态,而epoll采用回调机制。造成的结果就是,随着fd的增加,select和poll的效率会线性降低,而epoll不会受到太大影响,除非活跃的socket很多。
  • epoll 基于事件驱动,epoll_ctl 注册事件并注册 callback 回调函数,epoll_wait 只返回发生的事件避免了像 select 和 poll 对事件的整个轮询操作。

虽然epoll的性能最好,但是在连接数少并且连接都十分活跃的情况下,select和poll的性能可能比epoll好,毕竟epoll的通知机制需要很多函数回调。

三者比较

1. 用户态将文件描述符传入内核的方式

  • select:创建3个文件描述符集并拷贝到内核中,分别监听读、写、异常动作。这里受到单个进程可以打开的fd数量限制,默认是1024。
  • poll:将传入的struct pollfd结构体数组拷贝到内核中进行监听。
  • epoll:执行epoll_create会在内核的高速cache区中建立一颗红黑树以及就绪链表(该链表存储已经就绪的文件描述符)。接着用户执行的epoll_ctl函数添加文件描述符会在红黑树上增加相应的结点。

2. 内核态检测文件描述符读写状态的方式

  • select:采用轮询方式,遍历所有fd,最后返回一个描述符读写操作是否就绪的mask掩码,根据这个掩码给fd_set赋值。
  • poll:同样采用轮询方式,查询每个fd的状态,如果就绪则在等待队列中加入一项并继续遍历。
  • epoll:采用回调机制。在执行epoll_ctl的add操作时,不仅将文件描述符放到红黑树上,而且也注册了回调函数,内核在检测到某文件描述符可读/可写时会调用回调函数,该回调函数将文件描述符放在就绪链表中。

3. 找到就绪的文件描述符并传递给用户态的方式

  • select:将之前传入的fd_set拷贝传出到用户态并返回就绪的文件描述符总数。用户态并不知道是哪些文件描述符处于就绪态,需要遍历来判断。
  • poll:将之前传入的fd数组拷贝传出用户态并返回就绪的文件描述符总数。用户态并不知道是哪些文件描述符处于就绪态,需要遍历来判断。
  • epoll:epoll_wait只用观察就绪链表中有无数据即可,最后将链表的数据返回给数组并返回就绪的数量。内核将就绪的文件描述符放在传入的数组中,所以只用遍历依次处理即可。这里返回的文件描述符是通过mmap让内核和用户空间共享同一块内存实现传递的,减少了不必要的拷贝。

4. 重复监听的处理方式

  • select:将新的监听文件描述符集合拷贝传入内核中,继续以上步骤。
  • poll:将新的struct pollfd结构体数组拷贝传入内核中,继续以上步骤。
  • epoll:无需重新构建红黑树,直接沿用已存在的即可。

NIO 如何使用的这三兄弟

具体的 demo 分析可以看:https://blog.csdn.net/nieyanshun_me/article/details/52397153

在 windows 下,只支持 select,不支持 epoll,而 linux 2.6是支持 epoll的,从 demo 中可以知道 selector.select(); 这一步调用了 epoll 系统调用「复杂度 O(1)」,只要客户端准备好了数据,就会去告知 Selector 数据准备好了,可以进行 I/O 读写了,而不是像 select 「复杂度 O(n)」,知道有数据准备好了之后,把所有的连接全部轮一遍,看看是谁准备好了…费时费力。所以说啊,还是事务驱动好啊,就跟我们学习一样,还是一边面试一边学某些知识会更快一点,select 就像我们自己看书一样,知道自己有不足了,但是不知道哪块不行,于是就把所有知识全部复习一遍,epoll 就相当于面试的时候面试官直接告诉你哪块菜的不行,然后你就能对症下药并且最快效率的补齐自己的短板。

从 NIO 看 Netty

在 NIO 一节中,我主要是介绍了下其文件 IO 用到的零拷贝技术以及 IO 复用技术,这一小节,我会从 NIO 的角度,一窥著名的 Netty。

Netty 产生的缘由

  1. JDK 的 NIO 编程复杂。模型不友好,ByteBuffer 的 api 反人类;
  2. 没有实现线程模型,连自定义协议拆包都需要自己实现;
  3. JDK 的 NIO,bug 太多,维护成本过高。

所以,与之对应,Netty 的好处在于:

  1. Netty底层IO模型随意可切换;
  2. 自带拆包解包,异常检测等机制,只需要关心业务逻辑;
  3. Netty 的线程模型优化的好,精心设计的 Reactor 模型可以做到非常高效;
  4. 自带各种协议栈;
  5. 社区活跃、dubbo、hsf、kafka、metaQ都在使用,健壮性经受考验。

基本概念

Netty is an asynchronous event-driven network application framework for rapid development of maintainable high performance protocol servers & clients.

从官网上的介绍可以看到,Netty 是一个网络应用框架,提供了异步事件驱动 的方式,使用 Netty 可以快速开发出一个高性能的网络应用程序,实际上,我们可以理解成 Netty 是一套在 java NIO 的基础上封装的便于用户开发网络应用程序的 api 。

整体架构

让我们来看看 Netty 的整体架构:

img

可以看到,Netty 主要分为三部分:

  1. 底层的零拷贝技术、事件驱动模型以及统一的通信模型—Reactor;
  2. 基于 JVM 实现的传输层;
  3. 常用协议的支持,如 HTTP、SSL、gzip等等。

重新实现 ByteBuffer

Netty 重新实现了 NIO 中的 ByteBuffer,可以更好的解决很多问题,例如半包、粘包。

半包:接收端将一个发送端的 ByteBuffer “拆开” 了,收到了多个破碎的包;

粘包:接收端收到了多个 ByteBuffer。

解决思路就是根据自定义协议,将数据读取到缓冲区进行二次拼装,重新组装得到我们应用层的数据包。

统一的 I/O 接口

Netty 有 Channel 这个统一的编程接口,所以可以在传输层进行实现的切换,例如可以将 Socket 切换成 Datagram,还可以灵活的在 NIO 和 BIO 中进行切换,由于核心 API 具有高度的可扩展性,可以很容易的定制自己的传输实现。

高级组件

Netty 提供了一系列高级组件,如 SSL/TLS 支持、HTTP实现等等。

基于拦截链模式的事件模型

一个定义良好并具有扩展能力的事件模型是事件驱动开发的必要条件。Netty 具有定义良好的 I/O 事件模型。由于严格的层次结构区分了不同的事件类型,因此 Netty 也允许你在不破坏现有代码的情况下实现自己的事件类型。这是与其他框架相比另一个不同的地方。很多 NIO 框架没有或者仅有有限的事件模型概念;在你试图添加一个新的事件类型的时候常常需要修改已有的代码,或者根本就不允许你进行这种扩展。

这块暂时没啥体感…

异步事件驱动 — Reactor 模型

主要参考:https://cloud.tencent.com/developer/article/1488120

这里采用的“异步事件驱动”,就是我们上文讲的 IO 复用技术,也可以换个名字,就是 reactor 模型。也称为 Dispatcher 模型,也可以称为同步非阻塞模型。

有人可能会奇怪,这里明明谈到的是 “异步事件驱动”,怎么又是 “同步非阻塞模型”,这里我就要稍微解释一下了,在 “同步非阻塞” 中,其实也有两种形式,一种是非事件驱动,线程主动去询问数据是否准备好了「select」,另外一种呢,就是事件驱动,数据准备好了之后主动告知对应的线程「epoll」,这里的 “异步” ,指的就是线程无需主动的去询问数据是否准备好了,而是可以去干其他事情,此时的线程和数据准备的确是异步的。

举个生活中更常见的例子,一个是:

如果你想吃一份卤肉饭,在饭馆点完餐,就去遛狗了。不过遛一会儿,就回饭馆喊一声:好了没啊!—– 非事件驱动

如果你想吃一份卤肉饭,在饭馆点完餐,就去遛狗了,遛狗的时候,接到饭馆电话,说饭做好了,让您亲自去拿。— 事件驱动

这里能看到我们在遛狗的时候无需回去问饭是否做好了,异步的概念就很明显了!

而同步非阻塞,则是从整个IO传输的角度看,在数据准备就绪之后,依旧需要内核来调度新的线程来进行IO传输,这样明显是同步的,所以说,二者的同步异步的范围不一样,一个是仅仅聚焦于IO准备就绪,一个是聚焦于整个的IO传输。

概述

Reactor模型中定义的三种角色:

  • Reactor:负责监听和分配事件,将I/O事件分派给对应的Handler。新的事件包含连接建立就绪、读就绪、写就绪等。也就是上文代码中的 “select” 角色;
  • Acceptor:处理客户端新连接,并分派请求到处理器链中。就是上文代码中触发 “accept” 语义。
  • Handler:将自身与事件绑定,执行非阻塞读/写任务,完成channel的读入,完成处理业务逻辑后,负责将结果写出channel。可用资源池来管理。就是上文代码中触发 “read”语义。

Reactor处理请求的流程:

读取操作:

  1. 应用程序注册读就绪事件和相关联的事件处理器;
  2. 事件分离器等待事件的发生;
  3. 当发生读就绪事件的时候,事件分离器调用第一步注册的事件处理器;

写入操作类似于读取操作,只不过第一步注册的是写就绪事件。

单 Reactor 单线程模型

Reactor线程负责多路分离套接字,accept新连接,并分派请求到handler。Redis使用单Reactor单进程的模型。

消息处理流程:

  1. Reactor对象通过select监控连接事件,收到事件后通过dispatch进行转发。
  2. 如果是连接建立的事件,则由acceptor接受连接,并创建handler处理后续事件。
  3. 如果不是建立连接事件,则Reactor会分发调用Handler来响应。
  4. handler会完成read->业务处理->send的完整业务流程。

这里的 handler 没有异步,所以对于高负载、大并发的应用场景并不合适。

image-20200614193322055

Tip: 这里照应上面的例子,单 Reactor 单线程模型其实就相当于只有一个专门负责接待的服务员,他不仅要负责询问我们是否要进店「注册套接字,绑定端口」,还要负责告诉引导员将顾客带入相应的座位「acceptor接受连接,准备进行读取」,最后还要负责通知上菜员上菜「handler,传输数据」

单 Reactor 多线程模型

与单线程模型相比,这里引入了 worker 线程池进行业务流程的处理。

img

消息处理流程:

  1. Reactor对象通过Select监控客户端请求事件,收到事件后通过dispatch进行分发。
  2. 如果是建立连接请求事件,则由acceptor通过accept处理连接请求,然后创建一个Handler对象处理连接完成后续的各种事件。
  3. 如果不是建立连接事件,则Reactor会分发调用连接对应的Handler来响应。
  4. Handler只负责响应事件,不做具体业务处理,通过Read读取数据后,会分发给后面的Worker线程池进行业务处理。
  5. Worker线程池会分配独立的线程完成真正的业务处理,如何将响应结果发给Handler进行处理。
  6. Handler收到响应结果后通过send将响应结果返回给Client。
Tip: 这里照应上面的例子,单 Reactor 多线程模型其实就相当于最后拿走菜单的那个人回到厨房,然后有很多人上菜。

可以看到,这里的 reactor、accept、handler都专注于分发了,业务处理交由 worker 线程池去处理了,这样整个系统的吞吐就上去了,但是,这个模型还是存在两个比较明显的问题:

  1. 如果子线程完成业务处理后,把结果传递给主线程Reactor进行发送,就会涉及共享数据的互斥和保护机制。
  2. Reactor承担所有事件的监听和响应,只在主线程中运行,可能会存在性能问题。例如并发百万客户端连接,或者服务端需要对客户端握手进行安全认证,但是认证本身非常损耗性能。
一句话来说,就是专门负责接待的服务员太忙了,所有通知的活都要他来干…

主从 Reactor 多线程模型

比起第二种模型,它是将Reactor分成两部分:

  1. mainReactor负责监听server socket,用来处理网络IO连接建立操作,将建立的socketChannel指定注册给subReactor。
  2. subReactor「这里也可以是多线程」主要做和建立起来的socket做数据交互和事件业务处理操作。通常,subReactor个数上可与CPU个数等同。
通俗点来说,就是负责接待的服务员只需要站在门口,问别人是否要进来就餐,剩下的事情,如将其引导到相应的位置「相当于建立连接」、告知顾客菜全部上完了「相当于worker干完了,需要通知给client」,就由 subReactor来全部完成了。这也对应了我们现在饭店的模型:有专门的一个人在门口负责询问顾客是否要进来就餐,顾客进入餐厅后,有专人负责引导到座位,顾客点餐完后,对应的负责人看到后通知后厨做菜,然后有多人分别给顾客端菜,最后菜上齐了,负责人告知顾客上齐了,最后引导顾客结账…

image-20200614195614566

Netty 线程模型

Netty的线程模型,就是主从 Reactor 多线程模型,当然了,Netty 可以在以上三种 Reactor 模型之间进行切换。

扩充:AIO 中使用的 Proactor 模型

由图中可以看到:

  1. Procator Initiator负责创建Procator和Handler,并将Procator和Handler都通过Asynchronous operation processor注册到内核。
  2. Asynchronous operation processor负责处理注册请求,并完成IO操作。完成IO操作后会通知procator。
  3. procator根据不同的事件类型回调不同的handler进行业务处理。handler完成业务处理,handler也可以注册新的handler到内核进程。

Proactor模型

Reactor 实现了一个被动的事件分离和分发模型,服务等待请求事件的到来,再通过不受间断的同步处理事件,从而做出反应,适用于同时接收多个服务请求,并且依次同步的处理它们的事件驱动程序;

Proactor 实现了一个主动的事件分离和分发模型;这种设计允许多个任务并发的执行,从而提高吞吐量;并可执行耗时长的任务(各个任务间互不影响),异步接收和同时处理多个服务请求的事件驱动程序。

Proactor 实现起来比较复杂,因为需要内核的深度参与,同时缓冲区在读或写操作的时间段内必须保持住,可能造成持续的不确定性,并且每个并发操作都要求有独立的缓存,相比Reactor模型,在Socket已经准备好读或写前,是不要求开辟缓存的。现在高并发网络编程基本上还是都采用 Reactor 模型。

从 Netty 看 HSF/Dubbo

有了 Netty,我们就可以自己去实现一个 rpc 框架了,其实最为著名的就是 HSF、Dubbo了,二者都是采用了 Netty + Hessian。

HSF 框架中采用如今流行的网络通信框架 Netty 加上 Hessian 数据序列化协议实现 HSF 服务间的交互,主要考虑点是在大并发量时,服务交互性能达到最佳。这类 RPC 协议采用多路复用的 TCP 长连接方式,在服务提供者和调用者间有多个服务请求同时调用时会共用同一个长连接,即一个连接交替传输不同请求的字节块。它既避免了反复建立连接开销,也避免了连接的等待闲置从而减少了系统连接总数,同时还避免了 TCP 顺序传输中的线头阻塞(head-of-line blocking)问题

Hessian 是 HSF 框架中默认使用的数据序列化协议,在数据量较小时性能表现出众,Hessian 的优点是精简高效,同时可以跨语言使用,目前支持 Java, C++, .net, Python, ruby 等语言。另外 Hessian 可以充分利用 Web 容器的成熟功能,在处理大量用户访问时很有优势,在资源分配、线程排队、异常处理等方面都可以由 Web 容器保证。

HSF 框架同时也支持切换使用 Java 序列化,Hession 相比 JDK 标准的序列化方式(即基于 Serializable 接口的标准序列化),在典型场景中,其序列化时间开销可能缩短 20 倍。虽然 Hessian 不是最快的序列化协议,但它对于复杂业务对象的序列化正确率、准确性相较于最稳定的 Java 序列化并不逊色太多。

业界还有一些比 Hessian 更快的序列化协议,但它们相对于 Hessian 在复杂场景下的处理能力还是会差一些,所以 Hessian 是在性能和稳定性同时考虑下最优的序列化协议。

阿里巴巴当时在对多种通信协议和数据序列化组件等测试中,Netty + Hession 的组合在互联网高并发量的场景下,特别是在 TPS 上达到 10w 以上时,性能和效率远比 REST 或者 Web Service 高。

从 Netty 看 kafka/metaQ

主要引用:

kafka之所以快的六大原因

讲到了kafka快的两个原因:页缓存技术+磁盘顺序写+零拷贝

Kafka中的NIO网络通信模型

先来回答那个老生常谈的问题,kafka为何这么吞吐量大而且还快?

我认为主要是以下几点:

  1. 使用了 NIO 的网络 I/O 通信模型,就是 I/O 复用,所以减去了等待数据准备的时间,加快速度;
  2. 使用页缓存技术「Page Cache」磁盘顺序写以及 sendfile 的零拷贝技术加快落盘速度和传送速度,因为 Kafka 中存在大量的网络数据持久化到磁盘和磁盘文件通过网络发送的过程;
  3. 分区分段 + 索引,批量读写、批量压缩。

kafka 中的 NIO 网络通信模型

Kafka的网络通信模型是基于NIO的主从Reactor多线程模型来设计的。就是我在 Netty 一节中提及到的,这里再次来回顾一次,先引用Kafka源码中注释的一段话:

An NIO socket server. The threading model is 1 Acceptor thread that handles new connections. Acceptor has N Processor threads that each have their own selector and read requests from sockets. M Handler threads that handle requests and produce responses back to the processor threads for writing.

相信大家看了上面的这段引文注释后,大致可以了解到Kafka的网络通信层模型,主要采用了1(1个Acceptor线程)+N(N个Processor线程)+M(M个业务处理线程)。下面的表格简要的列举了下(这里先简单的看下后面还会详细说明):『Acceptor 就是 NIO 中的 Selector』

线程数 线程名 线程具体说明
1 kafka-socket-acceptor_%x Acceptor线程,负责监听Client端发起的请求
N kafka-network-thread_%d Processor线程,负责对Socket进行读写
M kafka-request-handler-_%d Worker线程,处理具体的业务逻辑并生成Response返回

Kafka网络通信层的完整框架图如下图所示:

kafka网络通信框架图

这里可以简单总结一下其网络通信模型中的几个重要概念:

(1)Acceptor:1个接收线程,负责监听新的连接请求,同时注册OP_ACCEPT 事件,将新的连接按照“round robin”方式交给对应的 Processor 线程处理;

(2)Processor:N个处理器线程,其中每个 Processor 都有自己的 selector,它会向 Acceptor 分配的 SocketChannel 注册相应的 OP_READ 事件,N 的大小由“num.networker.threads”决定;

(3)KafkaRequestHandler:M个请求处理线程,包含在线程池—KafkaRequestHandlerPool内部,从RequestChannel的全局请求队列—requestQueue中获取请求数据并交给KafkaApis处理,M的大小由“num.io.threads”决定;

(4)RequestChannel:其为Kafka服务端的请求通道,该数据结构中包含了一个全局的请求队列 requestQueue和多个与Processor处理器相对应的响应队列responseQueue,提供给Processor与请求处理线程KafkaRequestHandler和KafkaApis交换数据的地方;

(5)NetworkClient:其底层是对 Java NIO 进行相应的封装,位于Kafka的网络接口层。Kafka消息生产者对象—KafkaProducer的send方法主要调用NetworkClient完成消息发送;

(6)SocketServer:其是一个NIO的服务,它同时启动一个Acceptor接收线程和多个Processor处理器线程。提供了一种典型的Reactor多线程模式,将接收客户端请求和处理请求相分离;

(7)KafkaServer:代表了一个Kafka Broker的实例;其startup方法为实例启动的入口;

(8)KafkaApis:Kafka的业务逻辑处理Api,负责处理不同类型的请求;比如“发送消息”“获取消息偏移量—offset”“处理心跳请求”等;

Page Cache(页缓存技术)

什么是页缓存

操作系统本身有一层缓存,叫做page cache,是在内存里的缓存,我们也可以称之为os cache,意思就是操作系统自己管理的缓存。

好处是什么

这种方式相当于读写内存,不是在读写磁盘,个人觉得有点类似于直接将磁盘和内核缓冲区放一块了「我们平常的 I/O 就是用 DMA 将 内核缓冲区和磁盘连接」,既然是读写内存,好处就是很快很快。

如何使用页缓存

你在写磁盘文件的时候,可以直接写入os cache 中,也就是仅仅写入内存中,接下来由操作系统自己决定什么时候把os cache 里的数据真的刷入到磁盘中「刷盘其实就是用直接在末尾追加,也快的一批」。

image-20200617105401912

为何不使用 jvm 中的内存

  1. 避免Object消耗:如果是使用 Java 堆,Java对象的内存消耗比较大,通常是所存储数据的两倍甚至更多;

  2. 避免GC问题:随着JVM中数据不断增多,垃圾回收将会变得复杂与缓慢,使用系统缓存就不会存在GC问题;

  3. 操作系统对页缓存有很多优化,提供了 write-behind、read-ahead以及flush等多种机制。

磁盘顺序写

磁盘顺序写,仅仅将数据追加到文件的末尾(append),而不是在文件的随机位置来修改数据。减去了寻址的时间,性能甚至比内存随机读写要快,这样就极大的加快了 os cache 落盘的速度,吞吐量更大了。

但是,这种方法有一个缺陷—— 没有办法删除数据 ,所以Kafka是不会删除数据的,它会把所有的数据都保留下来,每个消费者(Consumer)对每个Topic都有一个offset用来表示读取到了第几条数据 。

如果不删除硬盘肯定会被撑满,所以Kakfa提供了两种策略来删除数据。一是基于时间,二是基于partition文件大小。

sendfile 的零拷贝技术

允许操作系统将数据从Page Cache 直接发送到网络,只需要最后一步的copy操作将数据复制到 NIC 缓冲区, 这样避免重新复制数据 。通过这种 “零拷贝” 的机制,Page Cache 结合 sendfile 方法,Kafka消费端的性能也大幅提升。这也是为什么有时候消费端在不断消费数据时,我们并没有看到磁盘io比较高,此刻正是操作系统缓存在提供数据。

kafka中的零拷贝

这里拓展一下:

RocketMQ 选择了 mmap + write 这种零拷贝方式,适用于业务级消息这种小块文件的数据持久化和传输;而 Kafka 采用的是 sendfile 这种零拷贝方式,适用于系统日志消息这种高吞吐量的大块文件的数据持久化和传输。但是值得注意的一点是,Kafka 的索引文件使用的是 mmap + write 方式,数据文件使用的是 sendfile 方式。

kafka vs RocketMQ

分区分段 + 索引

  • kafka中的 message 是按 topic 存储的,每个 topic 又可以有多个分区 partition「每个partition 都有一个 leader,若干个 follower」,每个partition对应了操作系统上的一个文件夹「也就是单个broker上」,然后 partition 又由多个 segment 组成,分区分段,这样扫描起来性能更快,因为操作文件只需要操作一个 segment 就可以了,细粒度化了;
  • 为了更快的检索 segment,使用了索引文件,而为了索引文件更快的读取,又采用了零拷贝技术中的 mmap + write。「是不是挺牛逼!」

这种分区分段+索引的设计,不仅提升了数据读取的效率,同时也提高了数据操作的并行度。

批量读写

Kafka数据读写也是批量的而不是单条的。

除了利用底层的技术外,Kafka还在应用程序层面提供了一些手段来提升性能。最明显的就是使用批次。在向Kafka写入数据时,可以启用批次写入,这样可以避免在网络上频繁传输单个消息带来的延迟和带宽开销。假设网络带宽为10MB/S,一次性传输10MB的消息比传输1KB的消息10000万次显然要快得多。

批量压缩

在很多情况下,系统的瓶颈不是CPU或磁盘,而是网络IO,对于需要在广域网上的数据中心之间发送消息的数据流水线尤其如此。进行数据压缩会消耗少量的CPU资源,不过对于kafka而言,网络IO更应该需要考虑,支持多种压缩,我自己有在项目中使用 gzip,的确是有加速,并且消费端无需自己解压,自动解压。

总结

kafka快的原因:「从流程上说」

Producer 发送消息时进行批量的发送并且批量压缩,减轻 网络 I/O 压力,然后是使用了 NIO 中的 网络I/O 模型,采用 I/O 复用机制,用 Selector 去轮询是否准备好了发送的数据,避免的其他线程的无效等待,等到数据传送到服务器时,会首先用索引去找到对应的segment,索引文件采用了 mmap + write 减少 cpu 拷贝次数加快访问速度,找到对应的 segment 之后,会将数据发送到 Page Cache,Page Cache 会在合适的时候进行刷盘,刷盘是顺序读写磁盘,所以刷盘也很快,然后消费者要消费时,利用 sendfile 的零拷贝技术直接从 Page Cache 发送到 SocketChannel,无需进行多余的拷贝和上下文切换,直接在内核态中完成,自此,数据发送给客户端。

Thank you for your accept. mua!
-------------本文结束感谢您的阅读-------------